import json
import random
import re
import string
import threading
import time
from multiprocessing.dummy import Pool

import pytest

from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster

cluster = ClickHouseCluster(__file__)

node1 = cluster.add_instance(
    "node1",
    main_configs=[
        "configs/logs_config.xml",
        "configs/config.d/storage_configuration.xml",
        "configs/config.d/cluster.xml",
    ],
    with_zookeeper=True,
    stay_alive=True,
    tmpfs=["/jbod1:size=40M", "/jbod2:size=40M", "/external:size=200M"],
    macros={"shard": 0, "replica": 1},
)

node2 = cluster.add_instance(
    "node2",
    main_configs=[
        "configs/logs_config.xml",
        "configs/config.d/storage_configuration.xml",
        "configs/config.d/cluster.xml",
    ],
    with_zookeeper=True,
    stay_alive=True,
    tmpfs=["/jbod1:size=40M", "/jbod2:size=40M", "/external:size=200M"],
    macros={"shard": 0, "replica": 2},
)


@pytest.fixture(scope="module")
def start_cluster():
    try:
        cluster.start()
        yield cluster

    finally:
        cluster.shutdown()


def get_oldest_part(node, table_name):
    return node.query(
        f"SELECT name FROM system.parts WHERE table = '{table_name}' and active = 1 ORDER BY modification_time LIMIT 1"
    ).strip()


def get_disk_for_part(node, table_name, part):
    return node.query(
        f"SELECT disk_name FROM system.parts WHERE table == '{table_name}' and active = 1 and name = '{part}' ORDER BY modification_time"
    ).strip()


def test_system_tables(start_cluster):
    expected_disks_data = [
        {
            "name": "default",
            "path": "/var/lib/clickhouse/",
            "keep_free_space": 1024,
        },
        {
            "name": "jbod1",
            "path": "/jbod1/",
            "keep_free_space": 0,
        },
        {
            "name": "jbod2",
            "path": "/jbod2/",
            "keep_free_space": 10485760,
        },
        {
            "name": "external",
            "path": "/external/",
            "keep_free_space": 0,
        },
    ]
    if node1.with_remote_database_disk:
        db_disk_path = node1.query(
            "SELECT path FROM system.disks WHERE name='disk_db_remote'"
        ).strip()
        expected_disks_data.append(
            {
                "name": "disk_db_remote",
                "path": f"{db_disk_path}",
                "keep_free_space": 0,
            }
        )

    click_disk_data = json.loads(
        node1.query("SELECT name, path, keep_free_space FROM system.disks FORMAT JSON")
    )["data"]
    assert sorted(click_disk_data, key=lambda x: x["name"]) == sorted(
        expected_disks_data, key=lambda x: x["name"]
    )

    expected_policies_data = [
        {
            "policy_name": "small_jbod_with_external",
            "volume_name": "main",
            "volume_priority": 1,
            "disks": ["jbod1"],
            "volume_type": "JBOD",
            "max_data_part_size": 0,
            "move_factor": 0.1,
            "prefer_not_to_merge": 0,
            "perform_ttl_move_on_insert": 1,
            "load_balancing": "ROUND_ROBIN",
        },
        {
            "policy_name": "small_jbod_with_external",
            "volume_name": "external",
            "volume_priority": 2,
            "disks": ["external"],
            "volume_type": "JBOD",
            "max_data_part_size": 0,
            "move_factor": 0.1,
            "prefer_not_to_merge": 0,
            "perform_ttl_move_on_insert": 1,
            "load_balancing": "ROUND_ROBIN",
        },
        {
            "policy_name": "small_jbod_with_external_no_merges",
            "volume_name": "main",
            "volume_priority": 1,
            "disks": ["jbod1"],
            "volume_type": "JBOD",
            "max_data_part_size": 0,
            "move_factor": 0.1,
            "prefer_not_to_merge": 0,
            "perform_ttl_move_on_insert": 1,
            "load_balancing": "ROUND_ROBIN",
        },
        {
            "policy_name": "small_jbod_with_external_no_merges",
            "volume_name": "external",
            "volume_priority": 2,
            "disks": ["external"],
            "volume_type": "JBOD",
            "max_data_part_size": 0,
            "move_factor": 0.1,
            "prefer_not_to_merge": 1,
            "perform_ttl_move_on_insert": 1,
            "load_balancing": "ROUND_ROBIN",
        },
        {
            "policy_name": "one_more_small_jbod_with_external",
            "volume_name": "m",
            "volume_priority": 1,
            "disks": ["jbod1"],
            "volume_type": "JBOD",
            "max_data_part_size": 0,
            "move_factor": 0.1,
            "prefer_not_to_merge": 0,
            "perform_ttl_move_on_insert": 1,
            "load_balancing": "ROUND_ROBIN",
        },
        {
            "policy_name": "one_more_small_jbod_with_external",
            "volume_name": "e",
            "volume_priority": 2,
            "disks": ["external"],
            "volume_type": "JBOD",
            "max_data_part_size": 0,
            "move_factor": 0.1,
            "prefer_not_to_merge": 0,
            "perform_ttl_move_on_insert": 1,
            "load_balancing": "ROUND_ROBIN",
        },
        {
            "policy_name": "jbods_with_external",
            "volume_name": "main",
            "volume_priority": 1,
            "disks": ["jbod1", "jbod2"],
            "volume_type": "JBOD",
            "max_data_part_size": 10485760,
            "move_factor": 0.1,
            "prefer_not_to_merge": 0,
            "perform_ttl_move_on_insert": 1,
            "load_balancing": "ROUND_ROBIN",
        },
        {
            "policy_name": "jbods_with_external",
            "volume_name": "external",
            "volume_priority": 2,
            "disks": ["external"],
            "volume_type": "JBOD",
            "max_data_part_size": 0,
            "move_factor": 0.1,
            "prefer_not_to_merge": 0,
            "perform_ttl_move_on_insert": 1,
            "load_balancing": "ROUND_ROBIN",
        },
        {
            "policy_name": "moving_jbod_with_external",
            "volume_name": "main",
            "volume_priority": 1,
            "disks": ["jbod1"],
            "volume_type": "JBOD",
            "max_data_part_size": 0,
            "move_factor": 0.7,
            "prefer_not_to_merge": 0,
            "perform_ttl_move_on_insert": 1,
            "load_balancing": "ROUND_ROBIN",
        },
        {
            "policy_name": "moving_jbod_with_external",
            "volume_name": "external",
            "volume_priority": 2,
            "disks": ["external"],
            "volume_type": "JBOD",
            "max_data_part_size": 0,
            "move_factor": 0.7,
            "prefer_not_to_merge": 0,
            "perform_ttl_move_on_insert": 1,
            "load_balancing": "ROUND_ROBIN",
        },
        {
            "policy_name": "default_disk_with_external",
            "volume_name": "small",
            "volume_priority": 1,
            "disks": ["default"],
            "volume_type": "JBOD",
            "max_data_part_size": 2097152,
            "move_factor": 0.1,
            "prefer_not_to_merge": 0,
            "perform_ttl_move_on_insert": 1,
            "load_balancing": "ROUND_ROBIN",
        },
        {
            "policy_name": "default_disk_with_external",
            "volume_name": "big",
            "volume_priority": 2,
            "disks": ["external"],
            "volume_type": "JBOD",
            "max_data_part_size": 20971520,
            "move_factor": 0.1,
            "prefer_not_to_merge": 0,
            "perform_ttl_move_on_insert": 1,
            "load_balancing": "ROUND_ROBIN",
        },
        {
            "policy_name": "special_warning_policy",
            "volume_name": "special_warning_zero_volume",
            "volume_priority": 1,
            "disks": ["default"],
            "volume_type": "JBOD",
            "max_data_part_size": 0,
            "move_factor": 0.1,
            "prefer_not_to_merge": 0,
            "perform_ttl_move_on_insert": 1,
            "load_balancing": "ROUND_ROBIN",
        },
        {
            "policy_name": "special_warning_policy",
            "volume_name": "special_warning_default_volume",
            "volume_priority": 2,
            "disks": ["external"],
            "volume_type": "JBOD",
            "max_data_part_size": 0,
            "move_factor": 0.1,
            "prefer_not_to_merge": 0,
            "perform_ttl_move_on_insert": 1,
            "load_balancing": "ROUND_ROBIN",
        },
        {
            "policy_name": "special_warning_policy",
            "volume_name": "special_warning_small_volume",
            "volume_priority": 3,
            "disks": ["jbod1"],
            "volume_type": "JBOD",
            "max_data_part_size": 1024,
            "move_factor": 0.1,
            "prefer_not_to_merge": 0,
            "perform_ttl_move_on_insert": 1,
            "load_balancing": "ROUND_ROBIN",
        },
        {
            "policy_name": "special_warning_policy",
            "volume_name": "special_warning_big_volume",
            "volume_priority": 4,
            "disks": ["jbod2"],
            "volume_type": "JBOD",
            "max_data_part_size": 1024000000,
            "move_factor": 0.1,
            "prefer_not_to_merge": 0,
            "perform_ttl_move_on_insert": 1,
            "load_balancing": "ROUND_ROBIN",
        },
    ]

    clickhouse_policies_data = json.loads(
        node1.query(
            "SELECT * FROM system.storage_policies WHERE policy_name != 'default' FORMAT JSON"
        )
    )["data"]

    def key(x):
        return (x["policy_name"], x["volume_name"], x["volume_priority"])

    assert sorted(clickhouse_policies_data, key=key) == sorted(
        expected_policies_data, key=key
    )


def test_query_parser(start_cluster):
    try:
        with pytest.raises(QueryRuntimeException):
            node1.query(
                """
                CREATE TABLE IF NOT EXISTS table_with_absent_policy (
                    d UInt64
                ) ENGINE = MergeTree()
                ORDER BY d
                SETTINGS storage_policy='very_exciting_policy'
            """
            )

        with pytest.raises(QueryRuntimeException):
            node1.query(
                """
                CREATE TABLE IF NOT EXISTS table_with_absent_policy (
                    d UInt64
                ) ENGINE = MergeTree()
                ORDER BY d
                SETTINGS storage_policy='jbod1'
            """
            )

        node1.query(
            """
                CREATE TABLE IF NOT EXISTS table_with_normal_policy (
                    d UInt64
                ) ENGINE = MergeTree()
                ORDER BY d
                SETTINGS storage_policy='default'
        """
        )

        node1.query("INSERT INTO table_with_normal_policy VALUES (5)")

        with pytest.raises(QueryRuntimeException):
            node1.query(
                "ALTER TABLE table_with_normal_policy MOVE PARTITION tuple() TO VOLUME 'some_volume'"
            )

        with pytest.raises(QueryRuntimeException):
            node1.query(
                "ALTER TABLE table_with_normal_policy MOVE PARTITION tuple() TO DISK 'some_volume'"
            )

        with pytest.raises(QueryRuntimeException):
            node1.query(
                "ALTER TABLE table_with_normal_policy MOVE PART 'xxxxx' TO DISK 'jbod1'"
            )

        with pytest.raises(QueryRuntimeException):
            node1.query(
                "ALTER TABLE table_with_normal_policy MOVE PARTITION 'yyyy' TO DISK 'jbod1'"
            )

        with pytest.raises(QueryRuntimeException):
            node1.query(
                "ALTER TABLE table_with_normal_policy MODIFY SETTING storage_policy='moving_jbod_with_external'"
            )
    finally:
        node1.query("DROP TABLE IF EXISTS table_with_normal_policy SYNC")


@pytest.mark.parametrize(
    "name,engine",
    [
        pytest.param("test_alter_policy", "MergeTree()", id="mt"),
        pytest.param(
            "replicated_test_alter_policy",
            "ReplicatedMergeTree('/clickhouse/test_alter_policy', '1')",
            id="replicated",
        ),
    ],
)
def test_alter_policy(start_cluster, name, engine):
    try:
        node1.query_with_retry(
            """
            CREATE TABLE IF NOT EXISTS {name} (
                d UInt64
            ) ENGINE = {engine}
            ORDER BY d
            SETTINGS storage_policy='small_jbod_with_external'
        """.format(
                name=name, engine=engine
            )
        )

        assert (
            node1.query(
                """SELECT storage_policy FROM system.tables WHERE name = '{name}'""".format(
                    name=name
                )
            )
            == "small_jbod_with_external\n"
        )

        with pytest.raises(QueryRuntimeException):
            node1.query(
                """ALTER TABLE {name} MODIFY SETTING storage_policy='one_more_small_jbod_with_external'""".format(
                    name=name
                )
            )

        assert (
            node1.query(
                """SELECT storage_policy FROM system.tables WHERE name = '{name}'""".format(
                    name=name
                )
            )
            == "small_jbod_with_external\n"
        )

        node1.query_with_retry(
            """ALTER TABLE {name} MODIFY SETTING storage_policy='jbods_with_external'""".format(
                name=name
            )
        )

        assert (
            node1.query(
                """SELECT storage_policy FROM system.tables WHERE name = '{name}'""".format(
                    name=name
                )
            )
            == "jbods_with_external\n"
        )

        with pytest.raises(QueryRuntimeException):
            node1.query(
                """ALTER TABLE {name} MODIFY SETTING storage_policy='small_jbod_with_external'""".format(
                    name=name
                )
            )

        assert (
            node1.query(
                """SELECT storage_policy FROM system.tables WHERE name = '{name}'""".format(
                    name=name
                )
            )
            == "jbods_with_external\n"
        )

    finally:
        node1.query_with_retry(f"DROP TABLE IF EXISTS {name} SYNC")


def get_random_string(length):
    return "randomPrintableASCII({})".format(length)


def get_used_disks_for_table(node, table_name):
    return tuple(
        node.query(
            "select disk_name from system.parts where table == '{}' and active=1 order by modification_time".format(
                table_name
            )
        )
        .strip()
        .split("\n")
    )


def get_used_parts_for_table(node, table_name):
    return node.query(
        "SELECT name FROM system.parts WHERE table = '{}' AND active = 1 ORDER BY modification_time".format(
            table_name
        )
    ).splitlines()


def test_no_warning_about_zero_max_data_part_size(start_cluster):
    def get_log(node):
        return node.exec_in_container(
            ["bash", "-c", "cat /var/log/clickhouse-server/clickhouse-server.log"]
        )

    for node in (node1, node2):
        node.query(
            """
            CREATE TABLE IF NOT EXISTS default.test_warning_table (
                s String
            ) ENGINE = MergeTree
            ORDER BY tuple()
            SETTINGS storage_policy='small_jbod_with_external'
        """
        )
        node.query("DROP TABLE IF EXISTS default.test_warning_table SYNC")
        log = get_log(node)
        assert not re.search("Warning.*Volume.*special_warning_zero_volume", log)
        assert not re.search("Warning.*Volume.*special_warning_default_volume", log)
        assert re.search("Warning.*Volume.*special_warning_small_volume", log)
        assert not re.search("Warning.*Volume.*special_warning_big_volume", log)


@pytest.mark.parametrize(
    "name,engine",
    [
        pytest.param("mt_on_jbod", "MergeTree()", id="mt"),
        pytest.param(
            "replicated_mt_on_jbod",
            "ReplicatedMergeTree('/clickhouse/replicated_mt_on_jbod', '1')",
            id="replicated",
        ),
    ],
)
def test_round_robin(start_cluster, name, engine):
    try:
        node1.query_with_retry(
            """
            CREATE TABLE IF NOT EXISTS {name} (
                d UInt64
            ) ENGINE = {engine}
            ORDER BY d
            SETTINGS storage_policy='jbods_with_external'
        """.format(
                name=name, engine=engine
            )
        )

        # first should go to the jbod1
        node1.query_with_retry(
            "insert into {} select * from numbers(10000)".format(name)
        )
        used_disk = get_used_disks_for_table(node1, name)
        assert len(used_disk) == 1, "More than one disk used for single insert"

        # sleep is required because we order disks by their modification time, and if insert will be fast
        # modification time of two disks will be equal, then sort will not provide deterministic results
        time.sleep(5)

        node1.query_with_retry(
            "insert into {} select * from numbers(10000, 10000)".format(name)
        )
        used_disks = get_used_disks_for_table(node1, name)

        assert len(used_disks) == 2, "Two disks should be used for two parts"
        assert used_disks[0] != used_disks[1], "Should write to different disks"

        time.sleep(5)

        node1.query_with_retry(
            "insert into {} select * from numbers(20000, 10000)".format(name)
        )
        used_disks = get_used_disks_for_table(node1, name)

        # jbod1 -> jbod2 -> jbod1 -> jbod2 ... etc
        assert len(used_disks) == 3
        assert used_disks[0] != used_disks[1]
        assert used_disks[2] == used_disks[0]
    finally:
        node1.query_with_retry(f"DROP TABLE IF EXISTS {name} SYNC")


@pytest.mark.parametrize(
    "name,engine",
    [
        pytest.param("mt_with_huge_part", "MergeTree()", id="mt"),
        pytest.param(
            "replicated_mt_with_huge_part",
            "ReplicatedMergeTree('/clickhouse/replicated_mt_with_huge_part', '1')",
            id="replicated",
        ),
    ],
)
def test_max_data_part_size(start_cluster, name, engine):
    try:
        assert (
            int(
                *node1.query(
                    """SELECT max_data_part_size FROM system.storage_policies WHERE policy_name = 'jbods_with_external' AND volume_name = 'main'"""
                ).splitlines()
            )
            == 10 * 1024 * 1024
        )

        node1.query_with_retry(
            """
            CREATE TABLE IF NOT EXISTS {name} (
                s1 String
            ) ENGINE = {engine}
            ORDER BY tuple()
            SETTINGS storage_policy='jbods_with_external'
        """.format(
                name=name, engine=engine
            )
        )
        data = []  # 10MB in total
        for i in range(10):
            data.append(get_random_string(1024 * 1024))  # 1MB row

        node1.query_with_retry(
            "INSERT INTO {} VALUES {}".format(
                name, ",".join(["(" + x + ")" for x in data])
            )
        )
        used_disks = get_used_disks_for_table(node1, name)
        assert len(used_disks) == 1
        assert used_disks[0] == "external"
    finally:
        node1.query_with_retry(f"DROP TABLE IF EXISTS {name} SYNC")


@pytest.mark.parametrize(
    "name,engine",
    [
        pytest.param("mt_with_overflow", "MergeTree()", id="mt"),
        pytest.param(
            "replicated_mt_with_overflow",
            "ReplicatedMergeTree('/clickhouse/replicated_mt_with_overflow', '1')",
            id="replicated",
        ),
    ],
)
def test_jbod_overflow(start_cluster, name, engine):
    try:
        node1.query_with_retry(
            """
            CREATE TABLE IF NOT EXISTS {name} (
                s1 String
            ) ENGINE = {engine}
            ORDER BY tuple()
            SETTINGS storage_policy='small_jbod_with_external'
        """.format(
                name=name, engine=engine
            )
        )

        node1.query(f"SYSTEM STOP MERGES {name}")
        # The test tries to utilize 35/40=87.5% of space, while during last
        # INSERT parts mover may see up to ~100% of used space on disk due to
        # reservations (since INSERT first reserves the space and later write
        # the same, more or less, amount of space, and util the reservation had
        # been destroyed it will be taken into account as reserved on the
        # disk).
        node1.query(f"SYSTEM STOP MOVES {name}")

        # small jbod size is 40MB, so lets insert 5MB batch 7 times
        for i in range(7):
            data = []  # 5MB in total
            for i in range(5):
                data.append(get_random_string(1024 * 1024))  # 1MB row
            node1.query_with_retry(
                "INSERT INTO {} VALUES {}".format(
                    name, ",".join(["(" + x + ")" for x in data])
                )
            )

        used_disks = get_used_disks_for_table(node1, name)
        assert used_disks == tuple("jbod1" for _ in used_disks)

        # should go to the external disk (jbod is overflown)
        data = []  # 10MB in total
        for i in range(10):
            data.append(get_random_string(1024 * 1024))  # 1MB row

        node1.query_with_retry(
            "INSERT INTO {} VALUES {}".format(
                name, ",".join(["(" + x + ")" for x in data])
            )
        )

        used_disks = get_used_disks_for_table(node1, name)

        assert used_disks[-1] == "external"

        node1.query(f"SYSTEM START MERGES {name}")
        node1.query(f"SYSTEM START MOVES {name}")
        time.sleep(1)

        node1.query_with_retry("OPTIMIZE TABLE {} FINAL".format(name))
        time.sleep(2)

        disks_for_merges = tuple(
            node1.query(
                "SELECT disk_name FROM system.parts WHERE table == '{}' AND level >= 1 and active = 1 ORDER BY modification_time".format(
                    name
                )
            )
            .strip()
            .split("\n")
        )

        assert disks_for_merges == tuple("external" for _ in disks_for_merges)

    finally:
        node1.query_with_retry(f"DROP TABLE IF EXISTS {name} SYNC")


@pytest.mark.parametrize(
    "name,engine",
    [
        pytest.param("moving_mt", "MergeTree()", id="mt"),
        pytest.param(
            "moving_replicated_mt",
            "ReplicatedMergeTree('/clickhouse/moving_replicated_mt', '1')",
            id="replicated",
        ),
    ],
)
def test_background_move(start_cluster, name, engine):
    try:
        node1.query_with_retry(
            f"""
            CREATE TABLE IF NOT EXISTS {name} (
                s1 String
            ) ENGINE = {engine}
            ORDER BY tuple()
            SETTINGS storage_policy='moving_jbod_with_external', max_replicated_merges_in_queue=0
        """
        )

        node1.query(f"SYSTEM STOP MERGES {name}")

        first_part = None
        for i in range(5):
            data = []  # 5MB in total
            for _ in range(5):
                data.append(get_random_string(1024 * 1024))  # 1MB row
            # small jbod size is 40MB, so lets insert 5MB batch 5 times
            node1.query_with_retry(
                "INSERT INTO {} VALUES {}".format(
                    name, ",".join(["(" + x + ")" for x in data])
                )
            )

            # we are doing moves in parallel so we need to fetch the name of first part before we add new parts
            if i == 0:
                first_part = get_oldest_part(node1, name)

        assert first_part is not None

        retry = 20
        i = 0
        # multiple moves can be assigned in parallel so we can move later parts before the oldest
        # we need to wait explicitly until the oldest part is moved
        while get_disk_for_part(node1, name, first_part) != "external" and i < retry:
            time.sleep(0.5)
            i += 1

        # first (oldest) part was moved to external
        assert get_disk_for_part(node1, name, first_part) == "external"

        node1.query("SYSTEM FLUSH LOGS")
        path = node1.query(
            f"SELECT path_on_disk FROM system.part_log WHERE table = '{name}' AND event_type='MovePart' AND part_name = '{first_part}'"
        )

        # first (oldest) part was moved to external
        assert path.startswith("/external")

        node1.query(f"SYSTEM START MERGES {name}")

    finally:
        node1.query_with_retry(f"DROP TABLE IF EXISTS {name} SYNC")


@pytest.mark.parametrize(
    "name,engine",
    [
        pytest.param("stopped_moving_mt", "MergeTree()", id="mt"),
        pytest.param(
            "stopped_moving_replicated_mt",
            "ReplicatedMergeTree('/clickhouse/stopped_moving_replicated_mt', '1')",
            id="replicated",
        ),
    ],
)
def test_start_stop_moves(start_cluster, name, engine):
    try:
        node1.query_with_retry(
            f"""
            CREATE TABLE IF NOT EXISTS {name} (
                s1 String
            ) ENGINE = {engine}
            ORDER BY tuple()
            SETTINGS storage_policy='moving_jbod_with_external', max_replicated_merges_in_queue=0
        """
        )

        node1.query_with_retry(f"INSERT INTO {name} VALUES ('HELLO')")
        node1.query_with_retry(f"INSERT INTO {name} VALUES ('WORLD')")

        used_disks = get_used_disks_for_table(node1, name)
        assert all(d == "jbod1" for d in used_disks), "All writes shoud go to jbods"

        first_part = get_oldest_part(node1, name)

        node1.query("SYSTEM STOP MOVES")

        with pytest.raises(QueryRuntimeException):
            node1.query(
                f"ALTER TABLE {name} MOVE PART '{first_part}' TO VOLUME 'external'"
            )

        used_disks = get_used_disks_for_table(node1, name)
        assert all(
            d == "jbod1" for d in used_disks
        ), "Blocked moves doesn't actually move something"

        node1.query("SYSTEM START MOVES")

        node1.query(f"ALTER TABLE {name} MOVE PART '{first_part}' TO VOLUME 'external'")

        disk = node1.query(
            f"SELECT disk_name FROM system.parts WHERE table = '{name}' and name = '{first_part}' and active = 1"
        ).strip()

        assert disk == "external"

        node1.query_with_retry(f"TRUNCATE TABLE {name}")

        node1.query(f"SYSTEM STOP MOVES {name}")
        node1.query(f"SYSTEM STOP MERGES {name}")

        first_part = None
        for i in range(5):
            data = []  # 5MB in total
            for _ in range(5):
                data.append(get_random_string(1024 * 1024))  # 1MB row
            # jbod size is 40MB, so lets insert 5MB batch 7 times
            node1.query_with_retry(
                "INSERT INTO {} VALUES {}".format(
                    name, ",".join(["(" + x + ")" for x in data])
                )
            )

            # we cannot rely simply on modification time of part because it can be changed
            # by different background operations so we explicitly check after the first
            # part is inserted
            if i == 0:
                first_part = get_oldest_part(node1, name)

        assert first_part is not None

        used_disks = get_used_disks_for_table(node1, name)

        retry = 5
        i = 0
        while not sum(1 for x in used_disks if x == "jbod1") <= 2 and i < retry:
            time.sleep(0.1)
            used_disks = get_used_disks_for_table(node1, name)
            i += 1

        # first (oldest) part doesn't move anywhere
        assert used_disks[0] == "jbod1"

        node1.query(f"SYSTEM START MOVES {name}")

        # multiple moves can be assigned in parallel so we can move later parts before the oldest
        # we need to wait explicitly until the oldest part is moved
        retry = 60
        i = 0
        while get_disk_for_part(node1, name, first_part) != "external" and i < retry:
            time.sleep(1)
            i += 1

        # first (oldest) part moved to external
        assert get_disk_for_part(node1, name, first_part) == "external"

        node1.query(f"SYSTEM START MERGES {name}")
    finally:
        node1.query_with_retry(f"DROP TABLE IF EXISTS {name} SYNC")


def get_path_for_part_from_part_log(node, table, part_name):
    node.query("SYSTEM FLUSH LOGS")
    path = node.query(
        "SELECT path_on_disk FROM system.part_log WHERE table = '{}' and part_name = '{}' ORDER BY event_time DESC LIMIT 1".format(
            table, part_name
        )
    )
    return path.strip()


def get_paths_for_partition_from_part_log(node, table, partition_id):
    node.query("SYSTEM FLUSH LOGS")
    paths = node.query(
        "SELECT path_on_disk FROM system.part_log WHERE table = '{}' and partition_id = '{}' ORDER BY event_time DESC".format(
            table, partition_id
        )
    )
    return paths.strip().split("\n")


@pytest.mark.parametrize(
    "name,engine",
    [
        pytest.param("altering_mt", "MergeTree()", id="mt"),
    ],
)
def test_alter_move(start_cluster, name, engine):
    try:
        node1.query(
            """
            CREATE TABLE IF NOT EXISTS {name} (
                EventDate Date,
                number UInt64
            ) ENGINE = {engine}
            ORDER BY tuple()
            PARTITION BY toYYYYMM(EventDate)
            SETTINGS storage_policy='jbods_with_external'
        """.format(
                name=name, engine=engine
            )
        )

        node1.query("SYSTEM STOP MERGES {}".format(name))  # to avoid conflicts

        node1.query("INSERT INTO {} VALUES(toDate('2019-03-15'), 65)".format(name))
        node1.query("INSERT INTO {} VALUES(toDate('2019-03-16'), 66)".format(name))
        node1.query("INSERT INTO {} VALUES(toDate('2019-04-10'), 42)".format(name))
        node1.query("INSERT INTO {} VALUES(toDate('2019-04-11'), 43)".format(name))
        assert node1.query("CHECK TABLE " + name) == "1\n"

        used_disks = get_used_disks_for_table(node1, name)
        assert all(
            d.startswith("jbod") for d in used_disks
        ), "All writes should go to jbods"

        first_part = node1.query(
            "SELECT name FROM system.parts WHERE table = '{}' and active = 1 ORDER BY modification_time LIMIT 1".format(
                name
            )
        ).strip()

        time.sleep(1)
        node1.query(
            "ALTER TABLE {} MOVE PART '{}' TO VOLUME 'external'".format(
                name, first_part
            )
        )
        assert node1.query("CHECK TABLE " + name) == "1\n"
        disk = node1.query(
            "SELECT disk_name FROM system.parts WHERE table = '{}' and name = '{}' and active = 1".format(
                name, first_part
            )
        ).strip()
        assert disk == "external"
        assert get_path_for_part_from_part_log(node1, name, first_part).startswith(
            "/external"
        )

        time.sleep(1)
        node1.query(
            "ALTER TABLE {} MOVE PART '{}' TO DISK 'jbod1'".format(name, first_part)
        )
        assert node1.query("CHECK TABLE " + name) == "1\n"
        disk = node1.query(
            "SELECT disk_name FROM system.parts WHERE table = '{}' and name = '{}' and active = 1".format(
                name, first_part
            )
        ).strip()
        assert disk == "jbod1"
        assert get_path_for_part_from_part_log(node1, name, first_part).startswith(
            "/jbod1"
        )

        time.sleep(1)
        node1.query(
            "ALTER TABLE {} MOVE PARTITION 201904 TO VOLUME 'external'".format(name)
        )
        assert node1.query("CHECK TABLE " + name) == "1\n"
        disks = (
            node1.query(
                "SELECT disk_name FROM system.parts WHERE table = '{}' and partition = '201904' and active = 1".format(
                    name
                )
            )
            .strip()
            .split("\n")
        )
        assert len(disks) == 2
        assert all(d == "external" for d in disks)
        assert all(
            path.startswith("/external")
            for path in get_paths_for_partition_from_part_log(node1, name, "201904")[:2]
        )

        time.sleep(1)
        node1.query("ALTER TABLE {} MOVE PARTITION 201904 TO DISK 'jbod2'".format(name))
        assert node1.query("CHECK TABLE " + name) == "1\n"
        disks = (
            node1.query(
                "SELECT disk_name FROM system.parts WHERE table = '{}' and partition = '201904' and active = 1".format(
                    name
                )
            )
            .strip()
            .split("\n")
        )
        assert len(disks) == 2
        assert all(d == "jbod2" for d in disks)
        assert all(
            path.startswith("/jbod2")
            for path in get_paths_for_partition_from_part_log(node1, name, "201904")[:2]
        )

        assert node1.query("SELECT COUNT() FROM {}".format(name)) == "4\n"

    finally:
        node1.query(f"DROP TABLE IF EXISTS {name} SYNC")


@pytest.mark.parametrize("volume_or_disk", ["DISK", "VOLUME"])
def test_alter_move_half_of_partition(start_cluster, volume_or_disk):
    name = "alter_move_half_of_partition"
    engine = "MergeTree()"
    try:
        node1.query(
            """
            CREATE TABLE IF NOT EXISTS {name} (
                EventDate Date,
                number UInt64
            ) ENGINE = {engine}
            ORDER BY tuple()
            PARTITION BY toYYYYMM(EventDate)
            SETTINGS storage_policy='jbods_with_external'
        """.format(
                name=name, engine=engine
            )
        )

        node1.query("SYSTEM STOP MERGES {}".format(name))

        node1.query("INSERT INTO {} VALUES(toDate('2019-03-15'), 65)".format(name))
        node1.query("INSERT INTO {} VALUES(toDate('2019-03-16'), 42)".format(name))
        used_disks = get_used_disks_for_table(node1, name)
        assert all(
            d.startswith("jbod") for d in used_disks
        ), "All writes should go to jbods"

        time.sleep(1)
        parts = node1.query(
            "SELECT name FROM system.parts WHERE table = '{}' and active = 1".format(
                name
            )
        ).splitlines()
        assert len(parts) == 2

        node1.query(
            "ALTER TABLE {} MOVE PART '{}' TO VOLUME 'external'".format(name, parts[0])
        )
        disks = node1.query(
            "SELECT disk_name FROM system.parts WHERE table = '{}' and name = '{}' and active = 1".format(
                name, parts[0]
            )
        ).splitlines()
        assert disks == ["external"]

        time.sleep(1)
        node1.query(
            "ALTER TABLE {} MOVE PARTITION 201903 TO {volume_or_disk} 'external'".format(
                name, volume_or_disk=volume_or_disk
            )
        )
        disks = node1.query(
            "SELECT disk_name FROM system.parts WHERE table = '{}' and partition = '201903' and active = 1".format(
                name
            )
        ).splitlines()
        assert disks == ["external"] * 2

        assert node1.query("SELECT COUNT() FROM {}".format(name)) == "2\n"

    finally:
        node1.query(f"DROP TABLE IF EXISTS {name} SYNC")


@pytest.mark.parametrize("volume_or_disk", ["DISK", "VOLUME"])
def test_alter_double_move_partition(start_cluster, volume_or_disk):
    name = "alter_double_move_partition"
    engine = "MergeTree()"
    try:
        node1.query(
            """
            CREATE TABLE IF NOT EXISTS {name} (
                EventDate Date,
                number UInt64
            ) ENGINE = {engine}
            ORDER BY tuple()
            PARTITION BY toYYYYMM(EventDate)
            SETTINGS storage_policy='jbods_with_external'
        """.format(
                name=name, engine=engine
            )
        )

        node1.query("SYSTEM STOP MERGES {}".format(name))

        node1.query("INSERT INTO {} VALUES(toDate('2019-03-15'), 65)".format(name))
        node1.query("INSERT INTO {} VALUES(toDate('2019-03-16'), 42)".format(name))
        used_disks = get_used_disks_for_table(node1, name)
        assert all(
            d.startswith("jbod") for d in used_disks
        ), "All writes should go to jbods"

        time.sleep(1)
        node1.query(
            "ALTER TABLE {} MOVE PARTITION 201903 TO {volume_or_disk} 'external'".format(
                name, volume_or_disk=volume_or_disk
            )
        )
        disks = node1.query(
            "SELECT disk_name FROM system.parts WHERE table = '{}' and partition = '201903' and active = 1".format(
                name
            )
        ).splitlines()
        assert disks == ["external"] * 2

        assert node1.query("SELECT COUNT() FROM {}".format(name)) == "2\n"

        time.sleep(1)
        with pytest.raises(QueryRuntimeException):
            node1.query(
                "ALTER TABLE {} MOVE PARTITION 201903 TO {volume_or_disk} 'external'".format(
                    name, volume_or_disk=volume_or_disk
                )
            )

    finally:
        node1.query(f"DROP TABLE IF EXISTS {name} SYNC")


def produce_alter_move(node, name):
    move_type = random.choice(["PART", "PARTITION"])
    if move_type == "PART":
        for _ in range(10):
            try:
                parts = (
                    node1.query(
                        "SELECT name from system.parts where table = '{}' and active = 1".format(
                            name
                        )
                    )
                    .strip()
                    .split("\n")
                )
                break
            except QueryRuntimeException:
                pass
        else:
            raise Exception("Cannot select from system.parts")

        move_part = random.choice(["'" + part + "'" for part in parts])
    else:
        move_part = random.choice([201903, 201904])

    move_disk = random.choice(["DISK", "VOLUME"])
    if move_disk == "DISK":
        move_volume = random.choice(["'external'", "'jbod1'", "'jbod2'"])
    else:
        move_volume = random.choice(["'main'", "'external'"])
    try:
        node1.query(
            "ALTER TABLE {} MOVE {mt} {mp} TO {md} {mv}".format(
                name, mt=move_type, mp=move_part, md=move_disk, mv=move_volume
            )
        )
    except QueryRuntimeException as ex:
        pass


@pytest.mark.parametrize(
    "name,engine",
    [
        pytest.param("detach_attach_mt", "MergeTree()", id="mt"),
        pytest.param(
            "replicated_detach_attach_mt",
            "ReplicatedMergeTree('/clickhouse/replicated_detach_attach_mt', '1')",
            id="replicated",
        ),
    ],
)
def test_detach_attach(start_cluster, name, engine):
    try:
        node1.query_with_retry(
            """
            CREATE TABLE IF NOT EXISTS {name} (
                s1 String
            ) ENGINE = {engine}
            ORDER BY tuple()
            SETTINGS storage_policy='moving_jbod_with_external'
        """.format(
                name=name, engine=engine
            )
        )

        data = []  # 5MB in total
        for i in range(5):
            data.append(get_random_string(1024 * 1024))  # 1MB row
        node1.query_with_retry(
            "INSERT INTO {} VALUES {}".format(
                name, ",".join(["(" + x + ")" for x in data])
            )
        )

        node1.query("ALTER TABLE {} DETACH PARTITION tuple()".format(name))
        assert node1.query("SELECT count() FROM {}".format(name)).strip() == "0"

        assert (
            node1.query(
                "SELECT disk FROM system.detached_parts WHERE table = '{}'".format(name)
            ).strip()
            == "jbod1"
        )

        node1.query("ALTER TABLE {} ATTACH PARTITION tuple()".format(name))
        assert node1.query("SELECT count() FROM {}".format(name)).strip() == "5"

    finally:
        node1.query_with_retry(f"DROP TABLE IF EXISTS {name} SYNC")


@pytest.mark.parametrize(
    "name,engine",
    [
        pytest.param("mutating_mt", "MergeTree()", id="mt"),
        pytest.param(
            "replicated_mutating_mt",
            "ReplicatedMergeTree('/clickhouse/replicated_mutating_mt', '1')",
            id="replicated",
        ),
    ],
)
def test_mutate_to_another_disk(start_cluster, name, engine):
    try:
        node1.query_with_retry(
            """
            CREATE TABLE IF NOT EXISTS {name} (
                s1 String
            ) ENGINE = {engine}
            ORDER BY tuple()
            SETTINGS storage_policy='moving_jbod_with_external'
        """.format(
                name=name, engine=engine
            )
        )

        for i in range(5):
            data = []  # 5MB in total
            for i in range(5):
                data.append(get_random_string(1024 * 1024))  # 1MB row
            node1.query_with_retry(
                "INSERT INTO {} VALUES {}".format(
                    name, ",".join(["(" + x + ")" for x in data])
                )
            )

        node1.query("ALTER TABLE {} UPDATE s1 = concat(s1, 'x') WHERE 1".format(name))

        retry = 20
        while (
            node1.query("SELECT * FROM system.mutations WHERE is_done = 0") != ""
            and retry > 0
        ):
            retry -= 1
            time.sleep(0.5)

        if (
            node1.query(
                "SELECT latest_fail_reason FROM system.mutations WHERE table = '{}'".format(
                    name
                )
            )
            == ""
        ):
            assert (
                node1.query("SELECT sum(endsWith(s1, 'x')) FROM {}".format(name))
                == "25\n"
            )
        else:  # mutation failed, let's try on another disk
            print("Mutation failed")
            node1.query_with_retry("OPTIMIZE TABLE {} FINAL".format(name))
            node1.query(
                "ALTER TABLE {} UPDATE s1 = concat(s1, 'x') WHERE 1".format(name)
            )
            retry = 20
            while (
                node1.query("SELECT * FROM system.mutations WHERE is_done = 0") != ""
                and retry > 0
            ):
                retry -= 1
                time.sleep(0.5)

            assert (
                node1.query("SELECT sum(endsWith(s1, 'x')) FROM {}".format(name))
                == "25\n"
            )

    finally:
        node1.query_with_retry(f"DROP TABLE IF EXISTS {name} SYNC")


@pytest.mark.parametrize(
    "name,engine",
    [
        pytest.param("alter_modifying_mt", "MergeTree()", id="mt"),
        pytest.param(
            "replicated_alter_modifying_mt",
            "ReplicatedMergeTree('/clickhouse/replicated_alter_modifying_mt', '1')",
            id="replicated",
        ),
    ],
)
def test_concurrent_alter_modify(start_cluster, name, engine):
    try:
        node1.query_with_retry(
            """
            CREATE TABLE IF NOT EXISTS {name} (
                EventDate Date,
                number UInt64
            ) ENGINE = {engine}
            ORDER BY tuple()
            PARTITION BY toYYYYMM(EventDate)
            SETTINGS storage_policy='jbods_with_external'
        """.format(
                name=name, engine=engine
            )
        )

        values = list({random.randint(1, 1000000) for _ in range(0, 1000)})

        def insert(num):
            for i in range(num):
                day = random.randint(11, 30)
                value = values.pop()
                month = "0" + str(random.choice([3, 4]))
                node1.query_with_retry(
                    "INSERT INTO {} VALUES(toDate('2019-{m}-{d}'), {v})".format(
                        name, m=month, d=day, v=value
                    )
                )

        def alter_move(num):
            for i in range(num):
                produce_alter_move(node1, name)

        def alter_modify(num):
            for i in range(num):
                column_type = random.choice(["UInt64", "String"])
                try:
                    node1.query(
                        "ALTER TABLE {} MODIFY COLUMN number {}".format(
                            name, column_type
                        )
                    )
                except:
                    if "Replicated" not in engine:
                        raise

        insert(100)

        assert node1.query("SELECT COUNT() FROM {}".format(name)) == "100\n"

        p = Pool(50)
        tasks = []
        for i in range(5):
            tasks.append(p.apply_async(alter_move, (100,)))
            tasks.append(p.apply_async(alter_modify, (100,)))

        for task in tasks:
            task.get(timeout=120)

        assert node1.query("SELECT 1") == "1\n"
        assert node1.query("SELECT COUNT() FROM {}".format(name)) == "100\n"

    finally:
        node1.query_with_retry(f"DROP TABLE IF EXISTS {name} SYNC")


def test_simple_replication_and_moves(start_cluster):
    try:
        for i, node in enumerate([node1, node2]):
            node.query_with_retry(
                """
                CREATE TABLE IF NOT EXISTS replicated_table_for_moves (
                    s1 String
                ) ENGINE = ReplicatedMergeTree('/clickhouse/replicated_table_for_moves', '{}')
                ORDER BY tuple()
                SETTINGS storage_policy='moving_jbod_with_external', old_parts_lifetime=1,
                cleanup_delay_period=1, cleanup_delay_period_random_add=2, cleanup_thread_preferred_points_per_iteration=0
            """.format(
                    i + 1
                )
            )

        def insert(num):
            for i in range(num):
                node = random.choice([node1, node2])
                data = []  # 1MB in total
                for i in range(2):
                    data.append(get_random_string(512 * 1024))  # 500KB value
                node.query_with_retry(
                    "INSERT INTO replicated_table_for_moves VALUES {}".format(
                        ",".join(["(" + x + ")" for x in data])
                    )
                )

        def optimize(num):
            for i in range(num):
                node = random.choice([node1, node2])
                node.query_with_retry("OPTIMIZE TABLE replicated_table_for_moves FINAL")

        p = Pool(60)
        tasks = []
        tasks.append(p.apply_async(insert, (20,)))
        tasks.append(p.apply_async(optimize, (20,)))

        for task in tasks:
            task.get(timeout=60)

        node1.query_with_retry(
            "SYSTEM SYNC REPLICA ON CLUSTER test_cluster replicated_table_for_moves",
            timeout=5,
        )

        node1.query("SELECT COUNT() FROM replicated_table_for_moves") == "40\n"
        node2.query("SELECT COUNT() FROM replicated_table_for_moves") == "40\n"

        data = []  # 1MB in total
        for i in range(2):
            data.append(get_random_string(512 * 1024))  # 500KB value

        time.sleep(3)  # wait until old parts will be deleted
        node1.query("SYSTEM STOP MERGES")
        node2.query("SYSTEM STOP MERGES")

        node1.query_with_retry(
            "INSERT INTO replicated_table_for_moves VALUES {}".format(
                ",".join(["(" + x + ")" for x in data])
            )
        )
        node2.query_with_retry(
            "INSERT INTO replicated_table_for_moves VALUES {}".format(
                ",".join(["(" + x + ")" for x in data])
            )
        )

        time.sleep(3)  # nothing was moved

        disks1 = get_used_disks_for_table(node1, "replicated_table_for_moves")
        disks2 = get_used_disks_for_table(node2, "replicated_table_for_moves")

        node2.query("SYSTEM START MERGES ON CLUSTER test_cluster")

        set(disks1) == set(["jbod1", "external"])
        set(disks2) == set(["jbod1", "external"])
    finally:
        for node in [node1, node2]:
            node.query("DROP TABLE IF EXISTS replicated_table_for_moves SYNC")


def test_download_appropriate_disk(start_cluster):
    try:
        for i, node in enumerate([node1, node2]):
            node.query_with_retry(
                """
                CREATE TABLE IF NOT EXISTS replicated_table_for_download (
                    s1 String
                ) ENGINE = ReplicatedMergeTree('/clickhouse/replicated_table_for_download', '{}')
                ORDER BY tuple()
                SETTINGS storage_policy='moving_jbod_with_external', old_parts_lifetime=1,
                cleanup_delay_period=1, cleanup_delay_period_random_add=2, cleanup_thread_preferred_points_per_iteration=0
            """.format(
                    i + 1
                )
            )

        data = []
        for i in range(50):
            data.append(get_random_string(1024 * 1024))  # 1MB value
        node1.query_with_retry(
            "INSERT INTO replicated_table_for_download VALUES {}".format(
                ",".join(["(" + x + ")" for x in data])
            )
        )

        for _ in range(10):
            try:
                print("Syncing replica")
                node2.query_with_retry(
                    "SYSTEM SYNC REPLICA replicated_table_for_download"
                )
                break
            except:
                time.sleep(0.5)

        disks2 = get_used_disks_for_table(node2, "replicated_table_for_download")

        assert set(disks2) == set(["external"])

    finally:
        for node in [node1, node2]:
            node.query_with_retry(
                "DROP TABLE IF EXISTS replicated_table_for_download SYNC"
            )


def test_rename(start_cluster):
    try:
        node1.query(
            """
            CREATE TABLE IF NOT EXISTS default.renaming_table (
                s String
            ) ENGINE = MergeTree
            ORDER BY tuple()
            SETTINGS storage_policy='small_jbod_with_external'
        """
        )

        # We want to check that after inserts, some parts were moved to external disk
        # and some parts are still on the main disk, but because of merge all parts
        # might end up on external disk.
        node1.query("SYSTEM STOP MERGES default.renaming_table")

        # jbod1 disk is 40mb
        for _ in range(5):
            data = []
            for i in range(10):
                data.append(get_random_string(1024 * 1024))  # 1MB value
            node1.query(
                "INSERT INTO renaming_table VALUES {}".format(
                    ",".join(["(" + x + ")" for x in data])
                )
            )

        # data is moved in the background, so check with retries
        num_try = 0
        while get_used_disks_for_table(node1, "renaming_table") == 1:
            time.sleep(1)
            num_try += 1
            if num_try == 20:
                break
        assert len(get_used_disks_for_table(node1, "renaming_table")) > 1
        assert node1.query("SELECT COUNT() FROM default.renaming_table") == "50\n"

        node1.query("RENAME TABLE default.renaming_table TO default.renaming_table1")
        assert node1.query("SELECT COUNT() FROM default.renaming_table1") == "50\n"

        with pytest.raises(QueryRuntimeException):
            node1.query("SELECT COUNT() FROM default.renaming_table")

        node1.query("CREATE DATABASE IF NOT EXISTS test")
        node1.query("RENAME TABLE default.renaming_table1 TO test.renaming_table2")
        assert node1.query("SELECT COUNT() FROM test.renaming_table2") == "50\n"

        with pytest.raises(QueryRuntimeException):
            node1.query("SELECT COUNT() FROM default.renaming_table1")

    finally:
        node1.query("DROP TABLE IF EXISTS default.renaming_table SYNC")
        node1.query("DROP TABLE IF EXISTS default.renaming_table1 SYNC")
        node1.query("DROP TABLE IF EXISTS test.renaming_table2 SYNC")


def test_freeze(start_cluster):
    try:
        node1.query(
            """
            CREATE TABLE IF NOT EXISTS default.freezing_table (
                d Date,
                s String
            ) ENGINE = MergeTree
            ORDER BY tuple()
            PARTITION BY toYYYYMM(d)
            SETTINGS storage_policy='small_jbod_with_external', compress_marks=false, compress_primary_key=false, ratio_of_defaults_for_sparse_serialization=1
        """
        )

        node1.query("SYSTEM STOP MERGES default.freezing_table")

        for _ in range(5):
            data = []
            dates = []
            for i in range(10):
                data.append(get_random_string(1024 * 1024))  # 1MB value
                dates.append("toDate('2019-03-05')")
            node1.query(
                "INSERT INTO freezing_table VALUES {}".format(
                    ",".join(["(" + d + ", " + s + ")" for d, s in zip(dates, data)])
                )
            )

        disks = get_used_disks_for_table(node1, "freezing_table")
        assert len(disks) > 1
        assert node1.query("SELECT COUNT() FROM default.freezing_table") == "50\n"

        node1.query("ALTER TABLE freezing_table FREEZE PARTITION 201903")
        # check shadow files (backups) exists
        node1.exec_in_container(
            ["bash", "-c", "find /jbod1/shadow -name '*.mrk2' | grep '.*'"]
        )
        node1.exec_in_container(
            ["bash", "-c", "find /external/shadow -name '*.mrk2' | grep '.*'"]
        )

    finally:
        node1.query("DROP TABLE IF EXISTS default.freezing_table SYNC")
        node1.exec_in_container(["rm", "-rf", "/jbod1/shadow", "/external/shadow"])


def test_kill_while_insert(start_cluster):
    try:
        name = "test_kill_while_insert"

        node1.query(
            """
            CREATE TABLE IF NOT EXISTS {name} (
                s String
            ) ENGINE = MergeTree
            ORDER BY tuple()
            SETTINGS storage_policy='small_jbod_with_external'
        """.format(
                name=name
            )
        )

        data = []
        dates = []
        for i in range(10):
            data.append(get_random_string(1024 * 1024))  # 1MB value
        node1.query(
            "INSERT INTO {name} VALUES {}".format(
                ",".join(["(" + s + ")" for s in data]), name=name
            )
        )

        disks = get_used_disks_for_table(node1, name)
        assert set(disks) == {"jbod1"}

        def ignore_exceptions(f, *args):
            try:
                f(*args)
            except:
                """(っಠ‿ಠ)っ"""

        start_time = time.time()
        long_select = threading.Thread(
            target=ignore_exceptions,
            args=(node1.query, "SELECT sleep(3) FROM {name}".format(name=name)),
        )
        long_select.start()

        sleep_start_time = time.time()
        time.sleep(0.5)
        # long SELECT query might have finished if sleep was too long
        assert time.time() - sleep_start_time < 1.5

        node1.query(
            "ALTER TABLE {name} MOVE PARTITION tuple() TO DISK 'external'".format(
                name=name
            )
        )
        assert time.time() - start_time < 2
        node1.restart_clickhouse(kill=True)

        try:
            long_select.join()
        except:
            """"""

        assert node1.query(
            "SELECT count() FROM {name}".format(name=name)
        ).splitlines() == ["10"]

    finally:
        try:
            node1.query(f"DROP TABLE IF EXISTS {name} SYNC")
        except:
            """ClickHouse may be inactive at this moment and we don't want to mask a meaningful exception."""


def test_move_while_merge(start_cluster):
    try:
        name = "test_move_while_merge"

        node1.query(
            """
            CREATE TABLE IF NOT EXISTS {name} (
                n Int64
            ) ENGINE = MergeTree
            ORDER BY sleep(2)
            SETTINGS storage_policy='small_jbod_with_external'
        """.format(
                name=name
            )
        )

        node1.query("INSERT INTO {name} VALUES (1)".format(name=name))
        node1.query("INSERT INTO {name} VALUES (2)".format(name=name))

        parts = get_used_parts_for_table(node1, name)
        assert len(parts) == 2

        def optimize():
            node1.query("OPTIMIZE TABLE {name}".format(name=name))

        optimize = threading.Thread(target=optimize)
        optimize.start()

        time.sleep(0.5)

        with pytest.raises(QueryRuntimeException):
            node1.query(
                "ALTER TABLE {name} MOVE PART '{part}' TO DISK 'external'".format(
                    name=name, part=parts[0]
                )
            )

        exiting = False
        no_exception = {}

        def alter():
            while not exiting:
                try:
                    node1.query(
                        "ALTER TABLE {name} MOVE PART '{part}' TO DISK 'external'".format(
                            name=name, part=parts[0]
                        )
                    )
                    no_exception["missing"] = "exception"
                    break
                except QueryRuntimeException:
                    """"""

        alter_thread = threading.Thread(target=alter)
        alter_thread.start()

        optimize.join()

        time.sleep(0.5)

        exiting = True
        alter_thread.join()
        assert len(no_exception) == 0

        assert node1.query(
            "SELECT count() FROM {name}".format(name=name)
        ).splitlines() == ["2"]

    finally:
        node1.query(f"DROP TABLE IF EXISTS {name} SYNC")


def test_move_across_policies_does_not_work(start_cluster):
    try:
        name = "test_move_across_policies_does_not_work"

        node1.query(
            """
            CREATE TABLE IF NOT EXISTS {name} (
                n Int64
            ) ENGINE = MergeTree
            ORDER BY tuple()
            SETTINGS storage_policy='jbods_with_external'
        """.format(
                name=name
            )
        )

        node1.query(
            """
            CREATE TABLE IF NOT EXISTS {name}2 (
                n Int64
            ) ENGINE = MergeTree
            ORDER BY tuple()
            SETTINGS storage_policy='small_jbod_with_external'
        """.format(
                name=name
            )
        )

        node1.query("""INSERT INTO {name} VALUES (1)""".format(name=name))
        try:
            node1.query(
                """ALTER TABLE {name} MOVE PARTITION tuple() TO DISK 'jbod2'""".format(
                    name=name
                )
            )
        except QueryRuntimeException:
            """All parts of partition 'all' are already on disk 'jbod2'."""

        # works when attach
        node1.query(
            """ALTER TABLE {name}2 ATTACH PARTITION tuple() FROM {name}""".format(
                name=name
            )
        )

        with pytest.raises(
            QueryRuntimeException,
            match=".*because disk does not belong to storage policy.*",
        ):
            node1.query(
                """ALTER TABLE {name}2 REPLACE PARTITION tuple() FROM {name}""".format(
                    name=name
                )
            )

        with pytest.raises(
            QueryRuntimeException,
            match=".*should have the same storage policy of source table.*",
        ):
            node1.query(
                """ALTER TABLE {name} MOVE PARTITION tuple() TO TABLE {name}2""".format(
                    name=name
                )
            )

        assert node1.query(
            """SELECT * FROM {name}2""".format(name=name)
        ).splitlines() == ["1"]

    finally:
        node1.query(f"DROP TABLE IF EXISTS {name} SYNC")
        node1.query(f"DROP TABLE IF EXISTS {name}2 SYNC")


def _insert_merge_execute(
    node, name, policy, parts, cmds, parts_before_cmds, parts_after_cmds
):
    try:
        node.query(
            """
            CREATE TABLE IF NOT EXISTS {name} (
                n Int64
            ) ENGINE = MergeTree
            ORDER BY tuple()
            PARTITION BY tuple()
            TTL now()-1 TO VOLUME 'external'
            SETTINGS storage_policy='{policy}'
        """.format(
                name=name, policy=policy
            ),
            settings={"allow_suspicious_ttl_expressions": 1},
        )

        for i in range(parts):
            node.query("""INSERT INTO {name} VALUES ({n})""".format(name=name, n=i))

        disks = get_used_disks_for_table(node, name)
        assert set(disks) == {"external"}

        node.query("""OPTIMIZE TABLE {name}""".format(name=name))

        parts = get_used_parts_for_table(node, name)
        assert len(parts) == parts_before_cmds

        for cmd in cmds:
            node.query(cmd)

        node.query("""OPTIMIZE TABLE {name}""".format(name=name))

        parts = get_used_parts_for_table(node, name)
        assert len(parts) == parts_after_cmds

    finally:
        node.query(f"DROP TABLE IF EXISTS {name} SYNC")


def _check_merges_are_working(node, storage_policy, volume, shall_work):
    try:
        name = "_check_merges_are_working_{storage_policy}_{volume}".format(
            storage_policy=storage_policy, volume=volume
        )

        node.query(
            """
            CREATE TABLE IF NOT EXISTS {name} (
                n Int64
            ) ENGINE = MergeTree
            ORDER BY tuple()
            PARTITION BY tuple()
            SETTINGS storage_policy='{storage_policy}'
        """.format(
                name=name, storage_policy=storage_policy
            )
        )

        created_parts = 24

        for i in range(created_parts):
            node.query("""INSERT INTO {name} VALUES ({n})""".format(name=name, n=i))
            try:
                node.query(
                    """ALTER TABLE {name} MOVE PARTITION tuple() TO VOLUME '{volume}' """.format(
                        name=name, volume=volume
                    )
                )
            except:
                """Ignore 'nothing to move'."""

        expected_disks = set(
            node.query(
                """
            SELECT disks FROM system.storage_policies ARRAY JOIN disks WHERE volume_name = '{volume_name}'
        """.format(
                    volume_name=volume
                )
            ).splitlines()
        )

        disks = get_used_disks_for_table(node, name)
        assert set(disks) <= expected_disks

        node.query("""OPTIMIZE TABLE {name} FINAL""".format(name=name))

        parts = get_used_parts_for_table(node, name)
        assert len(parts) == 1 if shall_work else created_parts

    finally:
        node.query(f"DROP TABLE IF EXISTS {name} SYNC")


def _get_prefer_not_to_merge_for_storage_policy(node, storage_policy):
    return list(
        map(
            int,
            node.query(
                "SELECT prefer_not_to_merge FROM system.storage_policies WHERE policy_name = '{}' ORDER BY volume_priority".format(
                    storage_policy
                )
            ).splitlines(),
        )
    )


def test_simple_merge_tree_merges_are_disabled(start_cluster):
    _check_merges_are_working(
        node1, "small_jbod_with_external_no_merges", "external", False
    )


def test_no_merges_in_configuration_allow_from_query_without_reload(start_cluster):
    try:
        name = "test_no_merges_in_configuration_allow_from_query_without_reload"
        policy = "small_jbod_with_external_no_merges"
        node1.restart_clickhouse(kill=True)
        assert _get_prefer_not_to_merge_for_storage_policy(node1, policy) == [0, 1]
        _check_merges_are_working(node1, policy, "external", False)

        _insert_merge_execute(
            node1,
            name,
            policy,
            2,
            ["SYSTEM START MERGES ON VOLUME {}.external".format(policy)],
            2,
            1,
        )
        assert _get_prefer_not_to_merge_for_storage_policy(node1, policy) == [0, 0]
        _check_merges_are_working(node1, policy, "external", True)

    finally:
        node1.query("SYSTEM STOP MERGES ON VOLUME {}.external".format(policy))


def test_no_merges_in_configuration_allow_from_query_with_reload(start_cluster):
    try:
        name = "test_no_merges_in_configuration_allow_from_query_with_reload"
        policy = "small_jbod_with_external_no_merges"
        node1.restart_clickhouse(kill=True)
        assert _get_prefer_not_to_merge_for_storage_policy(node1, policy) == [0, 1]
        _check_merges_are_working(node1, policy, "external", False)

        _insert_merge_execute(
            node1,
            name,
            policy,
            2,
            [
                "SYSTEM START MERGES ON VOLUME {}.external".format(policy),
                "SYSTEM RELOAD CONFIG",
            ],
            2,
            1,
        )
        assert _get_prefer_not_to_merge_for_storage_policy(node1, policy) == [0, 0]
        _check_merges_are_working(node1, policy, "external", True)

    finally:
        node1.query("SYSTEM STOP MERGES ON VOLUME {}.external".format(policy))


def test_no_merges_in_configuration_allow_from_query_with_reload_on_cluster(
    start_cluster,
):
    try:
        name = "test_no_merges_in_configuration_allow_from_query_with_reload"
        policy = "small_jbod_with_external_no_merges"
        node1.restart_clickhouse(kill=True)
        assert _get_prefer_not_to_merge_for_storage_policy(node1, policy) == [0, 1]
        _check_merges_are_working(node1, policy, "external", False)

        _insert_merge_execute(
            node1,
            name,
            policy,
            2,
            [
                "SYSTEM START MERGES ON CLUSTER test_cluster ON VOLUME {}.external".format(
                    policy
                ),
                "SYSTEM RELOAD CONFIG ON CLUSTER test_cluster",
            ],
            2,
            1,
        )
        assert _get_prefer_not_to_merge_for_storage_policy(node1, policy) == [0, 0]
        _check_merges_are_working(node1, policy, "external", True)

    finally:
        node1.query(
            "SYSTEM STOP MERGES ON CLUSTER test_cluster ON VOLUME {}.external".format(
                policy
            )
        )


def test_yes_merges_in_configuration_disallow_from_query_without_reload(start_cluster):
    try:
        name = "test_yes_merges_in_configuration_allow_from_query_without_reload"
        policy = "small_jbod_with_external"
        node1.restart_clickhouse(kill=True)
        assert _get_prefer_not_to_merge_for_storage_policy(node1, policy) == [0, 0]
        _check_merges_are_working(node1, policy, "external", True)

        _insert_merge_execute(
            node1,
            name,
            policy,
            2,
            [
                "SYSTEM STOP MERGES ON VOLUME {}.external".format(policy),
                "INSERT INTO {name} VALUES (2)".format(name=name),
            ],
            1,
            2,
        )
        assert _get_prefer_not_to_merge_for_storage_policy(node1, policy) == [0, 1]
        _check_merges_are_working(node1, policy, "external", False)

    finally:
        node1.query("SYSTEM START MERGES ON VOLUME {}.external".format(policy))


def test_yes_merges_in_configuration_disallow_from_query_with_reload(start_cluster):
    try:
        name = "test_yes_merges_in_configuration_allow_from_query_with_reload"
        policy = "small_jbod_with_external"
        node1.restart_clickhouse(kill=True)
        assert _get_prefer_not_to_merge_for_storage_policy(node1, policy) == [0, 0]
        _check_merges_are_working(node1, policy, "external", True)

        _insert_merge_execute(
            node1,
            name,
            policy,
            2,
            [
                "SYSTEM STOP MERGES ON VOLUME {}.external".format(policy),
                "INSERT INTO {name} VALUES (2)".format(name=name),
                "SYSTEM RELOAD CONFIG",
            ],
            1,
            2,
        )
        assert _get_prefer_not_to_merge_for_storage_policy(node1, policy) == [0, 1]
        _check_merges_are_working(node1, policy, "external", False)

    finally:
        node1.query("SYSTEM START MERGES ON VOLUME {}.external".format(policy))
